---
title: "Streaming Import"
sidebar_label: "Streaming Import"
description: "Streaming data into Convex"
---

import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";

Convex supports streaming import. Convex provides a connector implementation for
[Airbyte](/production/integrations/streaming-import-export.md). Those connectors
use the following APIs.

Streaming import support is automatically enabled for all Convex projects.

Streaming import requests require deployment admin authorization via the HTTP
header `Authorization`. The value is `Convex <access_key>` where the access key
comes from "Deploy key" on the Convex dashboard and gives full read and write
access to your Convex data.

### Headers

Streaming import endpoints accept a `Convex-Client: streaming-import-<version>`
header, where the version follows [Semver](https://semver.org/) guidelines. If
this header is not specified, Convex will default to the latest version. We
recommend using the header to ensure the consumer of this API does not break as
the API changes.

### GET `/api/streaming_import/primary_key_indexes_ready`

The `primary_key_indexes_ready` endpoint takes a list of table names and returns
true if the primary key indexes (created by `add_primary_key_indexes`) on all
those tables are ready. If the tables are newly created, the indexes should be
ready immediately; however if there are existing documents in the tables, it may
take some time to backfill the primary key indexes. The response looks like:

```json
{
  "indexesReady": true
}
```

### PUT `/api/streaming_import/add_primary_key_indexes`

The `add_primary_key_indexes` endpoint takes a JSON body containing the primary
keys for tables and creates indexes on the primary keys to be backfilled. Note
that they are not immediately ready to query - the `primary_key_indexes_ready`
endpoint needs to be polled until it returns True before calling
`import_airbyte_records` with records that require primary key indexes. Also
note that Convex queries will not have access to these added indexes. These are
solely for use in `import_airbyte_records`. The body takes the form of a map of
index names to list of field paths to index. Each field path is represented by a
list of fields that can represent nested field paths.

```json
{
  "indexes": {
    "<table_name>": [["<field1>"], ["<field2>", "<nested_field>"]]
  }
}
```

Expected API Usage:

1. Add indexes for primary keys by making a request to
   `add_primary_key_indexes`.
2. Poll `primary_key_indexes_ready` until the response is true.
3. Query using the added indexes.

### PUT `api/streaming_import/clear_tables`

The `clear_tables` endpoint deletes all documents from the specified tables.
Note that this may require multiple transactions. If there is an intermediate
error only some documents may be deleted. The JSON body to use this API request
contains a list of table names:

```json
{
  "tableNames": ["<table_1>", "<table_2>"]
}
```

### POST `api/streaming_import/replace_tables`

This endpoint is no longer supported. Use `api/streaming_import/clear_tables`
instead.

The `replace_tables` endpoint renames tables with temporary names to their final
names, deleting any existing tables with the final names.

The JSON body to use this API request contains a list of table names:

```json
{
  "tableNames": { "<table_1_temp>": "<table_1>", "<table_2_temp>": "<table_2>" }
}
```

### POST `api/streaming_import/import_airbyte_records`

The `import_airbyte_records` endpoint enables streaming ingress into a Convex
deployment and is designed to be called from an Airbyte destination connector.

It takes a map of streams and a list of messages in the JSON body. Each stream
has a name and JSON schema that will correspond to a Convex table. Streams where
records should be deduplicated include a primary key as well, which is
represented as a list of lists of strings that are field paths. Records for
streams without a primary key are appended to tables; records for streams with a
primary key replace an existing record where the primary key value matches or
are appended if there is no match. If you are using primary keys, you must call
the `add_primary_key_indexes` endpoint first and wait for them to backfill by
polling `primary_key_indexes_ready`.

Each message contains a stream name and a JSON document that will be inserted
(or replaced, in the case of deduplicated sync) into the table with the
corresponding stream name. Table names are same as the stream names. Airbyte
records become Convex documents.

```json
{
   "tables": {
      "<stream_name>": {
         "primaryKey": [["<field1>"], ["<field2>", "<nested_field>"]],
         "jsonSchema": // see https://json-schema.org/ for examples
      }
   },
   "messages": [{
      "tableName": "<table_name>",
      "data": {} // JSON object conforming to the `json_schema` for that stream
   }]
}
```

Similar to `clear_tables`, it is possible to execute a partial import using
`import_airbyte_records` if there is a failure after a transaction has
committed.

Expected API Usage:

1. [Optional] Add any indexes if using primary keys and
   [deduplicated sync](https://docs.airbyte.com/understanding-airbyte/connections/incremental-deduped-history/)
   (see `add_primary_key_indexes` above).
2. [Optional] Delete all documents in specified tables using `clear_tables` if
   using
   [overwrite sync](https://docs.airbyte.com/understanding-airbyte/connections/full-refresh-overwrite).
3. Make a request to `import_airbyte_records` with new records to sync and
   stream information.
